Cloud Composer で Deferrable Operators を使って BigQuery ジョブの完了を待機してみた #cm_google_cloud_adcal_2024

Cloud Composer で Deferrable Operators を使って BigQuery ジョブの完了を待機してみた #cm_google_cloud_adcal_2024

Cloud Composer を使って BigQuery ジョブを実行する際に、待機時間が長いジョブに直面したことはないでしょうか?
Clock Icon2024.12.23

こんにちは!エノカワです。

この記事はクラスメソッドの Google Cloud アドベントカレンダー2024 の 23日目の記事です。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

試したこと

今回は、Deferrable Operators を活用して BigQuery ジョブの完了を待機し、その動作の違いや利点を確かめてみました。
Deferrable Operators を使うと外部リソースの待機中に Worker を解放できるため、大幅にリソース効率を向上できます。

Deferrable Operators とは

Deferrable Operators は、Airflow 2.2 以降で導入された機能です。
イベント駆動型のアーキテクチャを採用することにより、外部リソースの待機中は Worker リソースをほとんど使わずに済む のが最大の特徴です。

以下のような課題に対して有効です。

  • BigQuery や外部 API 呼び出しなど、ジョブの完了待ち時間が長い
  • Worker ノードのリソースを節約したい
  • 多数のタスクを並列に実行したい

Airflow では、ポーリング(一定間隔で状態を確認)するようなオペレーターで長時間待機が発生すると、タスクが Running のまま Worker を占有してしまいがちです。
しかし、Deferrable Operators を利用すると、待機の間は Worker を解放 (Deferred 状態) するため、リソースの有効活用が期待できます。

Google Cloud がサポートする Deferrable Operators

Google Cloud が提供する演算子の中には、遅延可能モード(Deferrable)をサポートするものがあります。
具体的な一覧や詳細は、公式ドキュメントをご参照ください。

今回、Google Cloud がサポートする Deferrable Operators の一例である BigQueryInsertJobOperator を使用し、BigQuery ジョブの実行効率を比較します。

環境作成

DAG を動かすための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、Composer 3 を選択します。

cloud-composer-deferrable-operators-bigquery_01.png

test-composerという名前の環境を、東京リージョンで作成します。
サービスアカウントは本番運用を考えるとユーザーマネージドが推奨ですが、今回は検証用なのでデフォルトのものを使用しています。

cloud-composer-deferrable-operators-bigquery_02.png

cloud-composer-deferrable-operators-bigquery_03.png

DAG を作成する

今回は、約 1 分ほど処理がかかるクエリを実行する BigQuery ジョブを並列して実行させ、通常オペレーター版Deferrable Operator 版 の 2 つの DAG を用意しました。

以下は、通常のオペレーターと Deferrable Operator を比較するための DAG の例です。

compare_deferrable_operators.py
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

# 約1分の待機処理を含むクエリ
SLEEP_QUERY = """
BEGIN
  DECLARE x INT64 DEFAULT 0;
  REPEAT
    SET x = x + 1;
    UNTIL x >= 3000  -- 約1分の待機
  END REPEAT;
END;
"""

# 通常のオペレーターを使用するDAG
with DAG(
    "bigquery_insert_normal",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    max_active_tasks=1,
) as dag_normal:

    # 1から5までの通番でタスクを作成
    for i in range(1, 6):
        insert_job_normal = BigQueryInsertJobOperator(
            task_id=f"insert_job_normal_{i}",
            configuration={
                "query": {
                    "query": SLEEP_QUERY,
                    "useLegacySql": False
                }
            }
        )

# Deferrable Operatorを使用するDAG
with DAG(
    "bigquery_insert_deferrable",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    max_active_tasks=1,
) as dag_deferrable:

    # 1から5までの通番でタスクを作成
    for i in range(1, 6):
        insert_job_async = BigQueryInsertJobOperator(
            task_id=f"insert_job_async_{i}",
            configuration={
                "query": {
                    "query": SLEEP_QUERY,
                    "useLegacySql": False
                }
            },
            deferrable=True
        )
  • SLEEP_QUERY では、簡易的に約 1 分間の待機を作っています
  • max_active_tasks=1 により、同時に実行されるタスク数を 1 に抑えています(Deferrable Operators でも Worker 解放のメリットがある点を確認するため)
  • Deferrable Operator を使用するには、オペレーターの引数に deferrable=True を設定する必要があります

DAG を実行する

通常のオペレーターを使用するDAG

まずは 通常のオペレーターを使用するDAG を実行した様子を見ます。

cloud-composer-deferrable-operators-bigquery_04.png

1 つのタスクだけが Running 状態となり、他のタスクは実行待ち (scheduled) のまま順番に処理されていくのがわかります。
BigQuery ジョブを実行しながら約 1 分間の待機が入るため、タスク 5 つ分だと合計で 5 分以上を要する見込みです。
待機中もタスクが Running のままのため、Worker リソースは常に占有され、ほかのタスクが同時に実行されにくい状態になってしまいます。

cloud-composer-deferrable-operators-bigquery_05.png

DAG 実行後、ガントチャートを見ると、タスクが完全に直列で実行されていることがわかります。
もちろん max_active_tasks=1 の設定に依存しますが、待機中もタスクは Running のままで、 Worker が占有されます。

Deferrable Operator を使用するDAG

次に Deferrable Operator を使用するDAG を実行します。

cloud-composer-deferrable-operators-bigquery_06.png

今度はタスクが順次 Deferred 状態へ移行し、最終的には全タスクが同時に Deferred のまま待機している様子が確認できます。
タスクが Deferred 中は Worker リソースが解放されるため、実際には 5 つのタスクを同時に仕掛けていても、待機に伴うリソースの占有がありません。
そのため、DAG 全体の所要時間は 1 つのタスクが終わるまでとほぼ変わらず、効率よく処理が進みます。

cloud-composer-deferrable-operators-bigquery_07.png

ガントチャートを見ると、5つのタスクがほぼ同時に実行され、待機中は Worker のリソースを消費せずに待っているため、効率よく処理されている様子がわかります。

まとめ

Cloud Composer 上で Deferrable Operators を活用すると、BigQuery ジョブなど外部リソースへの長時間待機を効率化できることがわかりました。
特に以下のようなメリットを得られます。

  • リソース効率の向上
    待機中は Deferred 状態になり、Worker リソースが解放されるため、Cloud Composer のノードリソースを節約できます。
  • タスクのスループット向上
    同時実行したタスクがすべて待機中でも、リソースの奪い合いが発生しにくいため、より多くのタスクを並列実行できます。
  • コスト削減
    リソース占有時間を最小化でき、必要に応じてスケールダウンもしやすいので、結果的に Cloud Composer の運用コストも抑えられます。

具体的には、BigQuery ジョブの完了待機や長時間かかる外部 API の監視、ファイルアップロード完了待ち など、実行結果の返却が遅延するシナリオに最適です。
もし、大規模なワークフローや多数のジョブでリソースの枯渇を感じているようであれば、Deferrable Operators をぜひ試してみてください。

明日 12/24 は 和田祐介 さんです。よろしくお願いします!

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.